"""Manage a volatile in-memory filesystem.
"""
from __future__ import absolute_import
from __future__ import unicode_literals

import contextlib
import io
import os
import time
import typing
from collections import OrderedDict
from threading import RLock

import six

from . import errors
from .base import FS
from .enums import ResourceType, Seek
from .info import Info
from .mode import Mode
from .path import iteratepath
from .path import normpath
from .path import split
from ._typing import overload

if typing.TYPE_CHECKING:
    from typing import (
        Any,
        BinaryIO,
        Collection,
        Dict,
        Iterator,
        List,
        Optional,
        SupportsInt,
        Union,
        Text,
    )
    from .base import _OpendirFactory
    from .info import RawInfo
    from .permissions import Permissions
    from .subfs import SubFS

    _M = typing.TypeVar("_M", bound="MemoryFS")


@six.python_2_unicode_compatible
class _MemoryFile(io.RawIOBase):
    def __init__(self, path, memory_fs, mode, dir_entry):
        # type: (Text, MemoryFS, Text, _DirEntry) -> None
        super(_MemoryFile, self).__init__()
        self._path = path
        self._memory_fs = memory_fs
        self._mode = Mode(mode)
        self._dir_entry = dir_entry

        # We are opening a file - dir_entry.bytes_file is not None
        self._bytes_io = typing.cast(io.BytesIO, dir_entry.bytes_file)

        self.accessed_time = time.time()
        self.modified_time = time.time()
        self.pos = 0

        if self._mode.truncate:
            with self._dir_entry.lock:
                self._bytes_io.seek(0)
                self._bytes_io.truncate()
        elif self._mode.appending:
            with self._dir_entry.lock:
                self._bytes_io.seek(0, os.SEEK_END)
                self.pos = self._bytes_io.tell()

    def __str__(self):
        # type: () -> str
        _template = "<memoryfile '{path}' '{mode}'>"
        return _template.format(path=self._path, mode=self._mode)

    @contextlib.contextmanager
    def _seek_lock(self):
        # type: () -> Iterator[None]
        with self._dir_entry.lock:
            self._bytes_io.seek(self.pos)
            yield
            self.pos = self._bytes_io.tell()

    def on_modify(self):  # noqa: D401
        # type: () -> None
        """Called when file data is modified.
        """
        self._dir_entry.modified_time = self.modified_time = time.time()

    def on_access(self):  # noqa: D401
        # type: () -> None
        """Called when file is accessed.
        """
        self._dir_entry.accessed_time = self.accessed_time = time.time()

    def flush(self):
        # type: () -> None
        pass

    def __iter__(self):
        # type: () -> typing.Iterator[bytes]
        self._bytes_io.seek(self.pos)
        for line in self._bytes_io:
            yield line

    def next(self):
        # type: () -> bytes
        with self._seek_lock():
            return next(self._bytes_io)

    __next__ = next

    def readline(self, size=-1):
        # type: (int) -> bytes
        with self._seek_lock():
            self.on_access()
            return self._bytes_io.readline(size)

    def close(self):
        # type: () -> None
        if not self.closed:
            with self._dir_entry.lock:
                self._dir_entry.remove_open_file(self)
                super(_MemoryFile, self).close()

    def read(self, size=-1):
        # type: (Optional[int]) -> bytes
        if not self._mode.reading:
            raise IOError("File not open for reading")
        with self._seek_lock():
            self.on_access()
            return self._bytes_io.read(size)

    def readable(self):
        # type: () -> bool
        return self._mode.reading

    def readlines(self, hint=-1):
        # type: (int) -> List[bytes]
        with self._seek_lock():
            return self._bytes_io.readlines(hint)

    def seekable(self):
        # type: () -> bool
        return True

    def seek(self, pos, whence=Seek.set):
        # type: (int, SupportsInt) -> int
        # NOTE(@althonos): allows passing both Seek.set and os.SEEK_SET
        with self._seek_lock():
            self.on_access()
            return self._bytes_io.seek(pos, int(whence))

    def tell(self):
        # type: () -> int
        return self.pos

    def truncate(self, size=None):
        # type: (Optional[int]) -> int
        with self._seek_lock():
            self.on_modify()
            new_size = self._bytes_io.truncate(size)
            if size is not None and self._bytes_io.tell() < size:
                file_size = self._bytes_io.seek(0, os.SEEK_END)
                self._bytes_io.write(b"\0" * (size - file_size))
                self._bytes_io.seek(-size + file_size, os.SEEK_END)
            return size or new_size

    def writable(self):
        # type: () -> bool
        return self._mode.writing

    def write(self, data):
        # type: (bytes) -> int
        if not self._mode.writing:
            raise IOError("File not open for writing")
        with self._seek_lock():
            self.on_modify()
            return self._bytes_io.write(data)

    def writelines(self, sequence):  # type: ignore
        # type: (List[bytes]) -> None
        # FIXME(@althonos): For some reason the stub for IOBase.writelines
        #      is List[Any] ?! It should probably be Iterable[ByteString]
        with self._seek_lock():
            self.on_modify()
            self._bytes_io.writelines(sequence)


class _DirEntry(object):
    def __init__(self, resource_type, name):
        # type: (ResourceType, Text) -> None
        self.resource_type = resource_type
        self.name = name
        self._dir = OrderedDict()  # type: typing.MutableMapping[Text, _DirEntry]
        self._open_files = []  # type: typing.MutableSequence[_MemoryFile]
        self._bytes_file = None  # type: Optional[io.BytesIO]
        self.lock = RLock()

        current_time = time.time()
        self.created_time = current_time
        self.accessed_time = current_time
        self.modified_time = current_time

        if not self.is_dir:
            self._bytes_file = io.BytesIO()

    @property
    def bytes_file(self):
        # type: () -> Optional[io.BytesIO]
        return self._bytes_file

    @property
    def is_dir(self):
        # type: () -> bool
        return self.resource_type == ResourceType.directory

    @property
    def size(self):
        # type: () -> int
        with self.lock:
            if self.is_dir:
                return 0
            else:
                _bytes_file = typing.cast(io.BytesIO, self._bytes_file)
                _bytes_file.seek(0, os.SEEK_END)
                return _bytes_file.tell()

    @overload  # noqa: F811
    def get_entry(self, name, default):
        # type: (Text, _DirEntry) -> _DirEntry
        pass

    @overload  # noqa: F811
    def get_entry(self, name):
        # type: (Text) -> Optional[_DirEntry]
        pass

    @overload  # noqa: F811
    def get_entry(self, name, default):
        # type: (Text, None) -> Optional[_DirEntry]
        pass

    def get_entry(self, name, default=None):  # noqa: F811
        # type: (Text, Optional[_DirEntry]) -> Optional[_DirEntry]
        assert self.is_dir, "must be a directory"
        return self._dir.get(name, default)

    def set_entry(self, name, dir_entry):
        # type: (Text, _DirEntry) -> None
        self._dir[name] = dir_entry

    def remove_entry(self, name):
        # type: (Text) -> None
        del self._dir[name]

    def __contains__(self, name):
        # type: (object) -> bool
        return name in self._dir

    def __len__(self):
        # type: () -> int
        return len(self._dir)

    def list(self):
        # type: () -> List[Text]
        return list(self._dir.keys())

    def add_open_file(self, memory_file):
        # type: (_MemoryFile) -> None
        self._open_files.append(memory_file)

    def remove_open_file(self, memory_file):
        # type: (_MemoryFile) -> None
        self._open_files.remove(memory_file)


@six.python_2_unicode_compatible
class MemoryFS(FS):
    """A filesystem that stored in memory.

    Memory filesystems are useful for caches, temporary data stores,
    unit testing, etc. Since all the data is in memory, they are very
    fast, but non-permanent. The `MemoryFS` constructor takes no
    arguments.

    Example:
        >>> mem_fs = MemoryFS()

    Or via an FS URL:
        >>> import fs
        >>> mem_fs = fs.open_fs('mem://')

    """

    _meta = {
        "case_insensitive": False,
        "invalid_path_chars": "\0",
        "network": False,
        "read_only": False,
        "thread_safe": True,
        "unicode_paths": True,
        "virtual": False,
    }  # type: Dict[Text, Union[Text, int, bool, None]]

    def __init__(self):
        # type: () -> None
        """Create an in-memory filesystem.
        """
        self._meta = self._meta.copy()
        self.root = self._make_dir_entry(ResourceType.directory, "")
        super(MemoryFS, self).__init__()

    def __repr__(self):
        # type: () -> str
        return "MemoryFS()"

    def __str__(self):
        # type: () -> str
        return "<memfs>"

    def _make_dir_entry(self, resource_type, name):
        # type: (ResourceType, Text) -> _DirEntry
        return _DirEntry(resource_type, name)

    def _get_dir_entry(self, dir_path):
        # type: (Text) -> Optional[_DirEntry]
        """Get a directory entry, or `None` if one doesn't exist.
        """
        with self._lock:
            dir_path = normpath(dir_path)
            current_entry = self.root  # type: Optional[_DirEntry]
            for path_component in iteratepath(dir_path):
                if current_entry is None:
                    return None
                if not current_entry.is_dir:
                    return None
                current_entry = current_entry.get_entry(path_component)
            return current_entry

    def close(self):
        # type: () -> None
        if not self._closed:
            del self.root
        return super(MemoryFS, self).close()

    def getinfo(self, path, namespaces=None):
        # type: (Text, Optional[Collection[Text]]) -> Info
        namespaces = namespaces or ()
        _path = self.validatepath(path)
        dir_entry = self._get_dir_entry(_path)
        if dir_entry is None:
            raise errors.ResourceNotFound(path)
        info = {"basic": {"name": dir_entry.name, "is_dir": dir_entry.is_dir}}
        if "details" in namespaces:
            info["details"] = {
                "_write": ["accessed", "modified"],
                "type": int(dir_entry.resource_type),
                "size": dir_entry.size,
                "accessed": dir_entry.accessed_time,
                "modified": dir_entry.modified_time,
                "created": dir_entry.created_time,
            }
        return Info(info)

    def listdir(self, path):
        # type: (Text) -> List[Text]
        self.check()
        _path = self.validatepath(path)
        with self._lock:
            dir_entry = self._get_dir_entry(_path)
            if dir_entry is None:
                raise errors.ResourceNotFound(path)
            if not dir_entry.is_dir:
                raise errors.DirectoryExpected(path)
            return dir_entry.list()

    if typing.TYPE_CHECKING:

        def opendir(self, path, factory=None):
            # type: (_M, Text, Optional[_OpendirFactory]) -> SubFS[_M]
            pass

    def makedir(
        self,  # type: _M
        path,  # type: Text
        permissions=None,  # type: Optional[Permissions]
        recreate=False,  # type: bool
    ):
        # type: (...) -> SubFS[_M]
        _path = self.validatepath(path)
        with self._lock:
            if _path == "/":
                if recreate:
                    return self.opendir(path)
                else:
                    raise errors.DirectoryExists(path)

            dir_path, dir_name = split(_path)

            parent_dir = self._get_dir_entry(dir_path)
            if parent_dir is None:
                raise errors.ResourceNotFound(path)

            dir_entry = parent_dir.get_entry(dir_name)
            if dir_entry is not None and not recreate:
                raise errors.DirectoryExists(path)

            if dir_entry is None:
                new_dir = self._make_dir_entry(ResourceType.directory, dir_name)
                parent_dir.set_entry(dir_name, new_dir)
            return self.opendir(path)

    def openbin(self, path, mode="r", buffering=-1, **options):
        # type: (Text, Text, int, **Any) -> BinaryIO
        _mode = Mode(mode)
        _mode.validate_bin()
        _path = self.validatepath(path)
        dir_path, file_name = split(_path)

        if not file_name:
            raise errors.FileExpected(path)

        with self._lock:
            parent_dir_entry = self._get_dir_entry(dir_path)
            if parent_dir_entry is None or not parent_dir_entry.is_dir:
                raise errors.ResourceNotFound(path)

            if _mode.create:
                if file_name not in parent_dir_entry:
                    file_dir_entry = self._make_dir_entry(ResourceType.file, file_name)
                    parent_dir_entry.set_entry(file_name, file_dir_entry)
                else:
                    file_dir_entry = self._get_dir_entry(_path)  # type: ignore
                    if _mode.exclusive:
                        raise errors.FileExists(path)

                if file_dir_entry.is_dir:
                    raise errors.FileExpected(path)

                mem_file = _MemoryFile(
                    path=_path, memory_fs=self, mode=mode, dir_entry=file_dir_entry
                )

                file_dir_entry.add_open_file(mem_file)
                return mem_file  # type: ignore

            if file_name not in parent_dir_entry:
                raise errors.ResourceNotFound(path)

            file_dir_entry = parent_dir_entry.get_entry(file_name)  # type: ignore
            if file_dir_entry.is_dir:
                raise errors.FileExpected(path)

            mem_file = _MemoryFile(
                path=_path, memory_fs=self, mode=mode, dir_entry=file_dir_entry
            )
            file_dir_entry.add_open_file(mem_file)
            return mem_file  # type: ignore

    def remove(self, path):
        # type: (Text) -> None
        _path = self.validatepath(path)

        with self._lock:
            dir_path, file_name = split(_path)
            parent_dir_entry = self._get_dir_entry(dir_path)

            if parent_dir_entry is None or file_name not in parent_dir_entry:
                raise errors.ResourceNotFound(path)

            file_dir_entry = typing.cast(_DirEntry, self._get_dir_entry(_path))
            if file_dir_entry.is_dir:
                raise errors.FileExpected(path)

            parent_dir_entry.remove_entry(file_name)

    def removedir(self, path):
        # type: (Text) -> None
        _path = self.validatepath(path)

        if _path == "/":
            raise errors.RemoveRootError()

        with self._lock:
            dir_path, file_name = split(_path)
            parent_dir_entry = self._get_dir_entry(dir_path)

            if parent_dir_entry is None or file_name not in parent_dir_entry:
                raise errors.ResourceNotFound(path)

            dir_dir_entry = typing.cast(_DirEntry, self._get_dir_entry(_path))
            if not dir_dir_entry.is_dir:
                raise errors.DirectoryExpected(path)

            if len(dir_dir_entry):
                raise errors.DirectoryNotEmpty(path)

            parent_dir_entry.remove_entry(file_name)

    def setinfo(self, path, info):
        # type: (Text, RawInfo) -> None
        _path = self.validatepath(path)
        with self._lock:
            dir_path, file_name = split(_path)
            parent_dir_entry = self._get_dir_entry(dir_path)

            if parent_dir_entry is None or file_name not in parent_dir_entry:
                raise errors.ResourceNotFound(path)

            resource_entry = typing.cast(
                _DirEntry, parent_dir_entry.get_entry(file_name)
            )

            if "details" in info:
                details = info["details"]
                if "accessed" in details:
                    resource_entry.accessed_time = details["accessed"]  # type: ignore
                if "modified" in details:
                    resource_entry.modified_time = details["modified"]  # type: ignore
